-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32003][CORE] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost #28848
Conversation
|
||
// Shuffle files for hostA-exec should be lost | ||
val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses | ||
assert(mapStatuses.count(_ != null) == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the change this part fails.
Test build #124153 has finished for PR 28848 at commit
|
Build failure is unrelated. I have checked and there are some other PRs with the same error:
|
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @wypoon!
There are just some comments otherwise it is fine.
retest this please |
Test build #124160 has finished for PR 28848 at commit
|
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
cc @jiangxb1987 |
Thanks @attilapiros for the review and suggestions. Thanks @Ngone51 for reviewing as well. |
Test build #124180 has finished for PR 28848 at commit
|
retest this please |
Test build #124184 has finished for PR 28848 at commit
|
core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
Outdated
Show resolved
Hide resolved
Test build #124225 has finished for PR 28848 at commit
|
retest this please |
Test build #124226 has finished for PR 28848 at commit
|
@jiangxb1987 @squito can you please review? |
@wypoon Just a reminder, please do not resolve conversations before we reach into an agreement. |
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
Test build #124300 has finished for PR 28848 at commit
|
Retest this please. |
Test build #124305 has finished for PR 28848 at commit
|
Thank you for making a PR, @wypoon . |
Test build #125863 has finished for PR 28848 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
PySpark packaging tests fail with
but I don't understand what that means. Any help? |
Check this out: https://issues.apache.org/jira/browse/SPARK-32303 |
@tgravescs @squito I made some small tweaks to the scaladoc comments and also had to rebase since you approved the change. It has been open for some time now to any additional feedback. Can you give it the once over again and merge it? |
Jenkins, retest this please |
lgtm still. Since its been a few days and there was a bit of flakiness before, I triggered one more set of tests just to make sure everything is still OK and then will merge |
Test build #126279 has finished for PR 28848 at commit
|
I created #29182 for the backport to branch-2.4. |
@@ -548,6 +560,56 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi | |||
assert(mapStatus2(2).location.host === "hostB") | |||
} | |||
|
|||
test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Do we need to describe when external shuffle service is used
assumption here because we have conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
at line 567?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
personally, I'm OK with this as is, I think its OK for some of the details to be down in the test itself and balance a super-duper long name.
@dongjoon-hyun since you called this a nit I'm assuming you're OK with me merging this anyhow, but if not lemme know, can submit a quick followup.
still LGTM |
merged to master, thanks everyone! |
@wypoon sorry, I mistakenly only merged this to master, but I should have also merged it to branch-3.0. Would you mind also opening a pr targeting branch-3.0? |
### What changes were proposed in this pull request? The DecommissionWorkerSuite started becoming flaky and it revealed a real regression. Recently closed #29211 necessitates remembering the decommissioning shortly beyond the removal of the executor. In addition to fixing this issue, ensure that DecommissionWorkerSuite continues to pass when executors haven't had a chance to exit eagery. That is the old behavior before #29211 also still works. Added some more tests to TaskSchedulerImpl to ensure that the decommissioning information is indeed purged after a timeout. Hardened the test DecommissionWorkerSuite to make it wait for successful job completion. ### Why are the changes needed? First, let me describe the intended behavior of decommissioning: If a fetch failure happens where the source executor was decommissioned, we want to treat that as an eager signal to clear all shuffle state associated with that executor. In addition if we know that the host was decommissioned, we want to forget about all map statuses from all other executors on that decommissioned host. This is what the test "decommission workers ensure that fetch failures lead to rerun" is trying to test. This invariant is important to ensure that decommissioning a host does not lead to multiple fetch failures that might fail the job. This fetch failure can happen before the executor is truly marked "lost" because of heartbeat delays. - However, #29211 eagerly exits the executors when they are done decommissioning. This removal of the executor was racing with the fetch failure. By the time the fetch failure is triggered the executor is already removed and thus has forgotten its decommissioning information. (I tested this by delaying the decommissioning). The fix is to keep the decommissioning information around for some time after removal with some extra logic to finally purge it after a timeout. - In addition the executor loss can also bump up `shuffleFileLostEpoch` (added in #28848). This happens because when the executor is lost, it forgets the shuffle state about just that executor and increments the `shuffleFileLostEpoch`. This incrementing precludes the clearing of state of the entire host when the fetch failure happens because the failed task is still reusing the old epoch. The fix here is also simple: Ignore the `shuffleFileLostEpoch` when the shuffle status is being cleared due to a fetch failure resulting from host decommission. I am strategically making both of these fixes be very local to decommissioning to avoid other regressions. Especially the version stuff is tricky (it hasn't been fundamentally changed since it was first introduced in 2013). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Manually ran DecommissionWorkerSuite several times using a script and ensured it all passed. ### (Internal) Configs added I added two configs, one of which is sort of meant for testing only: - `spark.test.executor.decommission.initial.sleep.millis`: Initial delay by the decommissioner shutdown thread. Default is same as before of 1 second. This is used for testing only. This one is kept "hidden" (ie not added as a constant to avoid config bloat) - `spark.executor.decommission.removed.infoCacheTTL`: Number of seconds to keep the removed executors decom entries around. It defaults to 5 minutes. It should be around the average time it takes for all of the shuffle data to be fetched from the mapper to the reducer, but I think that can take a while since the reducers also do a multistep sort. Closes #29422 from agrawaldevesh/decom_fixes. Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… outputs for executor on fetch failure after executor is lost If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files. In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased. We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros. Without the changes, the loss of a node could require two stage attempts to recover instead of one. No. New unit test. This test fails without the change and passes with it. Closes #28848 from wypoon/SPARK-32003. Authored-by: Wing Yew Poon <wypoon@cloudera.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
What changes were proposed in this pull request?
If an executor is lost, the
DAGScheduler
handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files.In such a case, when fetches from the executor's outputs fail in the same stage, the
DAGScheduler
again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased.We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros.
Why are the changes needed?
Without the changes, the loss of a node could require two stage attempts to recover instead of one.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New unit test. This test fails without the change and passes with it.